# HRRP(高分辨率距离像算法)
## 算法概述
高分辨率距离像(High Resolution Range Profile, HRRP)是雷达信号处理中的核心技术,通过对雷达回波信号进行脉冲压缩和多普勒处理,获取目标在距离维上的散射点分布特征。HRRP具有以下特点:
- 距离高分辨:能够区分目标上相邻较近的散射中心
- 特征稳定:为目标识别提供物理特征描述
- 计算高效:相比二维成像计算复杂度较低
- 实时性强:适用于实时目标识别系统
## 代码来源
## MindSpore Signal+ 实现
将原有代码重构为基于计算图的MindSpore Cell,实现雷达信号处理流程模块化。
### 1 数据读取
在Python中使用MindSpore Signal+时,我们可以使用NumPy的fromfile接口读二进制格式文件,因此在Python代码开头需要导入NumPy库。
示例代码:
```python
def read_hrrp_data(filename, maichongshu, jln, rfftn):
"""
读取HRRP二进制数据文件并处理
"""
with open(filename, 'rb') as fp:
expected_size = maichongshu * jln * 2
data = np.fromfile(fp, dtype=np.float32)
if len(data) != expected_size:
raise ValueError(f"数据长度错误:期望{expected_size}个点,实际读取{len(data)}个点")
data = data.reshape(maichongshu, jln, 2)
datar = data[:, :, 0]
datai = data[:, :, 1]
# 创建填充0的完整数组
datar_full = np.zeros((maichongshu, rfftn), dtype=np.float32)
datai_full = np.zeros((maichongshu, rfftn), dtype=np.float32)
datar_full[:, :jln] = datar
datai_full[:, :jln] = datai
return datar_full, datai_full
```
读到的数据需要转换为复数形式,以便后续处理。
```python
datar, datai = read_hrrp_data("hrrp_400_512.dat", maichongshu, jln, rfftn)
input_data = np.zeros(datar.shape, dtype=np.complex64)
input_data = datar + 1j * datai
```
### 2 数据预处理
从算法整体分析,数据读取后到核心计算之前的步骤,主要是对相位补偿因子计算,Keystone变换参数预计算,泰勒窗函数预计算,循环移位索引预计算。这部分都是核心计算的前期准备,建议将这部分代码封装在`__init__`函数中完成,不放在`construct`函数中可以避免额外的开销,当实例化一个类时自动触发一次`__init__`函数。
示例代码:
```python
class Hrrp(nn.Cell):
def __init__(self, hangmc, liejl, f0, fs, B, mohuhalfshu, PRF, vdengxiao, mubiaojuli):
super(Hrrp, self).__init__()
self.hangmc = hangmc
self.liejl = liejl
self.f0 = f0
self.fs = fs
self.B = B
self.mohuhalfshu = mohuhalfshu
self.PRF = PRF
self.vdengxiao = vdengxiao
self.mubiaojuli = mubiaojuli
self.fft = mr.FFT()
self.ifft = mr.IFFT()
self.fftshift = mr.FFTShift()
self.abs = mr.ComplexAbs()
self.stoltsun = mr.Stoltsun(dim=0)
self.mul = ops.Mul()
self.exp = ops.Exp()
self.reduce_sum = ops.ReduceSum(keep_dims=True)
self.gather = ops.Gather()
wl = LC / self.f0
Ka = 2.0 * self.vdengxiao**2 / (wl * self.mubiaojuli)
mctime = (np.arange(self.hangmc) - self.hangmc * 0.5) / self.PRF
xiangwei = np.pi * Ka * mctime**2
self.phase_comp = np.exp(1j * xiangwei).astype(np.complex64)
self.phase_comp = self.phase_comp.reshape(self.hangmc, 1)
self.phase_comp = ms.Tensor(self.phase_comp)
fwn = self.hangmc
self.fwn = fwn
rnfft = Get2intm(self.liejl)
self.rnfft = rnfft
fwn_8 = (fwn + 7) // 8 * 8 # 8字节对齐
self.fwn_8 = fwn_8
self.xold = (np.arange(fwn_8, dtype=np.float32) - fwn / 2.0)[:, np.newaxis]
self.xold = np.broadcast_to(self.xold, (fwn_8, rnfft))
i_arr = np.arange(rnfft, dtype=np.float32)
sigma_arr = (f0 + (i_arr - rnfft * 0.5) * fs / rnfft) / f0
self.xnew = self.xold * sigma_arr
self.xold = ms.Tensor(self.xold)
self.xnew = ms.Tensor(self.xnew)
self.mchtr = self.TaiLeWindow(self.hangmc, 50)
window_2d = self.mchtr[:, np.newaxis] # 转换为列向量便于广播
self.window_2d = ms.Tensor(window_2d, dtype=ms.float32)
self.j_constant = ms.Parameter(ms.Tensor(1.0j, dtype=ms.complex64), name="j_constant")
self._precompute_cirshift_indices()
def _precompute_cirshift_indices(self):
rnfft = self.rnfft
shift = self.rnfft // 2
indices = np.arange(rnfft, dtype=np.int32)
indices_shifted = (indices - shift) % rnfft
self.cirshift_indices = ms.Tensor(indices_shifted, dtype=ms.int32)
def construct(self, input_data):
...
```
#### 2.1 相位补偿因子计算
```python
wl = LC / self.f0 # 计算波长
Ka = 2.0 * self.vdengxiao**2 / (wl * self.mubiaojuli) # 调频斜率
mctime = (np.arange(self.hangmc) - self.hangmc * 0.5) / self.PRF # 时间轴
xiangwei = np.pi * Ka * mctime**2 # 相位变化量
self.phase_comp = np.exp(1j * xiangwei).astype(np.complex64) # 相位补偿因子
```
功能说明:
- 计算二次相位误差补偿因子,用于校正由于目标运动引起的相位畸变
- 将连续时间计算转换为离散时间序列,便于数字信号处理
- 相位补偿因子形状为 (hangmc, 1),便于广播到所有距离单元
#### 2.2 Keystone变换参数预计算
```python
# 关键参数计算
fwn = self.hangmc
self.fwn = fwn
rnfft = Get2intm(self.liejl) # 获取2的幂次方长度
self.rnfft = rnfft
fwn_8 = (fwn + 7) // 8 * 8 # 8字节对齐优化
self.fwn_8 = fwn_8
# 方位向坐标轴生成
self.xold = (np.arange(fwn_8, dtype=np.float32) - fwn / 2.0)[:, np.newaxis]
self.xold = np.broadcast_to(self.xold, (fwn_8, rnfft))
# 距离频率轴生成和缩放因子计算
i_arr = np.arange(rnfft, dtype=np.float32)
sigma_arr = (f0 + (i_arr - rnfft * 0.5) * fs / rnfft) / f0
self.xnew = self.xold * sigma_arr
# 转换为MindSpore Tensor
self.xold = ms.Tensor(self.xold)
self.xnew = ms.Tensor(self.xnew)
```
参数设计原理:
| 参数 | 计算公式 | 物理意义 |
|-----------| ---------------------- | -------------------- |
| xold | (i - fwn/2.0) | 方位向原始坐标,零中心化 |
| sigma_arr | (f0 + (i - rnfft/2)*fs/rnfft)/f0 | 距离徙动校正因子 |
| xnew | xold * sigma_arr | Keystone变换后的坐标 |
#### 2.3 泰勒窗函数预计算
```python
self.mchtr = self.TaiLeWindow(self.hangmc, 50)
window_2d = self.mchtr[:, np.newaxis]
self.window_2d = ms.Tensor(window_2d, dtype=ms.float32)
def TaiLeWindow(self, N, param2):
# 参数处理
N = max(N, 1)
nLevel = 5
nsll = 45 if param2 < 13.0 else param2
# 参数转换
mB = math.pow(10.0, nsll / 20.0)
mA = math.log(mB + math.sqrt(mB * mB - 1.0)) / math.pi
msquaQ = (nLevel * nLevel) / (mA * mA + (nLevel - 0.5) * (nLevel - 0.5))
# 计算窗函数系数
m_dFm = [0.0] * nLevel
for m in range(nLevel - 1):
mtmpa = 1.0
mtmpb = 1.0
for i in range(nLevel - 1):
# 计算第一个乘积项
term = 1 - (m+1)**2 / (msquaQ * (mA**2 + (i+0.5)**2))
mtmpa *= term
# 跳过相同索引
if m == i:
continue
# 计算第二个乘积项
term2 = 1 - (m+1)**2 / ((i+1)**2)
mtmpb *= term2
# 计算系数值
m_dFm[m] = (pow(-1, m+2) * mtmpa) / (2.0 * mtmpb)
# 计算窗函数值
h = np.zeros(N)
for i in range(N):
dwin = 0.0
for m in range(nLevel - 1):
angle = 2 * math.pi * (m+1) * (i - N/2.0 + 0.5) / N
dwin += m_dFm[m] * math.cos(angle)
h[i] = 1.0 + 2.0 * dwin
# 归一化处理
mean_val = np.mean(h)
if abs(mean_val) > 1e-15:
h /= mean_val
return h
```
窗函数作用:
- 减少频谱泄露,提高频率分辨率
- 抑制旁瓣,提高主瓣分辨率
- 参数50表示旁瓣抑制比约为50dB
#### 2.4 循环移位索引预计算
```python
def _precompute_cirshift_indices(self):
rnfft = self.rnfft
shift = self.rnfft // 2 # 中心频率点索引
indices = np.arange(rnfft, dtype=np.int32)
indices_shifted = (indices - shift) % rnfft
self.cirshift_indices = ms.Tensor(indices_shifted, dtype=ms.int32)
```
循环移位的作用:
- 频谱中心化:将零频分量移动到频谱中心
- 视觉优化:便于频谱显示和分析
- 算法要求:某些算法(如IFFT)要求输入是中心对称的
### 3 核心计算
步骤分解:
1. 相位补偿:首先对输入数据(复数形式)进行相位补偿,乘以预计算好的相位补偿因子(用于校正二次相位误差)。
2. 多模糊数处理:计算需要搜索的模糊数总数(mohushu = 2 * mohuhalfshu + 1),然后对每一个模糊数进行以下操作:
a. 计算当前模糊数对应的偏移量(offset = mh - mohuhalfshu),这个偏移量将用于Keystone变换中的相位补偿。
b. 进行Keystone变换(Keystone_interp),这是HRRP算法的核心,用于校正距离徙动。
c. 对Keystone变换后的数据,进行HRRP生成(Gethrrp),得到每一距离单元的幅度和相位信息,同时返回处理后的数据(result)和幅度数据(hrrp)。
d. 计算该HRRP的熵(Gethrrp_v2shang),熵值越小表示能量越集中,成像质量越好。
e. 将当前模糊数对应的结果(result)和熵值(shang_value)分别保存到列表results和shang中。
3. 选择最优结果:将所有模糊数对应的结果堆叠起来(使用ops.cat),然后找到熵最小的那个索引(xuhao = ops.argmin(shang)),并取出该索引对应的结果(min_result = results[xuhao])。
4. 更新输出:将最优结果的第一行(即HRRP处理后的第一行,包含了最优的相位和幅度信息)与原始输入数据的第二行到最后一行拼接起来,形成最终的输出。这里之所以只替换第一行,是因为在HRRP处理中,我们只关心每个距离单元的最强散射点,而原始数据的第一行被用来存储这个信息。
核心算法整体流程:
```python
class Hrrp(nn.Cell):
def __init__(self, hangmc, liejl, f0, fs, B, mohuhalfshu, PRF, vdengxiao, mubiaojuli):
super(Hrrp, self).__init__()
self.hangmc = hangmc
self.f0 = f0
self.fs = fs
self.B = B
self.mohuhalfshu = mohuhalfshu
self.PRF = PRF
self.vdengxiao = vdengxiao
self.mubiaojuli = mubiaojuli
...
wl = LC / self.f0
Ka = 2.0 * self.vdengxiao**2 / (wl * self.mubiaojuli)
mctime = (np.arange(self.hangmc) - self.hangmc * 0.5) / self.PRF
xiangwei = np.pi * Ka * mctime**2
self.phase_comp = np.exp(1j * xiangwei).astype(np.complex64)
self.phase_comp = self.phase_comp.reshape(self.hangmc, 1)
self.phase_comp = ms.Tensor(self.phase_comp)
...
def construct(self, input_data):
out = input_data * self.phase_comp
mohushu = 2 * self.mohuhalfshu + 1
results = []
shang = []
for mh in range(mohushu):
offset = mh - self.mohuhalfshu
tmp = self.Keystone_interp(out, self.f0, self.fs, self.B, offset)
hrrp, result = self.Gethrrp(tmp, self.hangmc, self.liejl)
shang_value = self.Gethrrp_v2shang(hrrp)
shang.append(shang_value)
result = ops.expand_dims(result, 0)
results.append(result)
results = ops.cat(results, axis=0)
shang = ops.cat(shang)
xuhao = ops.argmin(shang)
min_result = results[xuhao]
out = ops.cat([min_result[0:1, :], out[1:, :]], axis=0)
return out
```
#### 3.1 相位补偿
```python
out = input_data * self.phase_comp
```
物理意义:
- 补偿目标:校正由于雷达与目标相对运动引起的二次相位误差
- 误差来源:目标斜距变化引起的时间延迟变化
- 数学形式:补偿因子为 exp(j·π·Ka·t²),其中Ka为调频斜率
#### 3.2 多模糊数处理循环
1. 模糊数范围确定:
```python
mohushu = 2 * self.mohuhalfshu + 1
```
设计原理:
- 由于多普勒频率模糊,真实的调频斜率有多个候选值
- 搜索范围:[-mohuhalfshu, +mohuhalfshu]
- 典型设置:mohuhalfshu=5 → 搜索11个候选值
2. 循环处理每个模糊数:
```python
for mh in range(mohushu):
offset = mh - self.mohuhalfshu # 当前模糊数偏移
tmp = self.Keystone_interp(out, self.f0, self.fs, self.B, offset)
hrrp, result = self.Gethrrp(tmp, self.hangmc, self.liejl)
shang_value = self.Gethrrp_v2shang(hrrp)
shang.append(shang_value)
result = ops.expand_dims(result, 0)
results.append(result)
```
处理流程分解:
|步骤 |功能 | 关键操作 |
|--------------| ----------- | -------------------- |
|Keystone变换 |距离徙动校正 | 使用当前模糊数进行插值 |
|HRRP生成 |距离像提取 | 方位向FFT,提取峰值 |
|熵计算 | 质量评估 | 计算HRRP的熵值 |
#### 3.3 Keystone变换详解
1. 变换原理
Keystone变换是一种距离徙动校正算法,用于解决以下问题:
变换公式:
原始坐标:t (方位时间), f (距离频率)
变换后:t' = t × (f₀/(f₀+f))
2. 代码实现
```python
# 1. 距离向FFT(转到距离频率域)
# 2. 相位补偿(补偿距离徙动),方位向插值(Keystone变换核心)
# 3. 距离向IFFT(转回时域)
def Keystone_interp(self, input_data, f0, fs, B, moshuzhouqishu):
# 获取维度
fwn = self.fwn
fwn_8 = self.fwn_8
# 计算距离FFT长度
rnfft = self.rnfft
# 模块1: 距离向FFT
out = self.FFT4f(input_data, 1)
nyquist_idx = rnfft//2
new_nyquist_values = (out[:, nyquist_idx-1] + out[:, nyquist_idx+1]) / 2.0
# 使用concat操作替换特定列
left_part = out[:, :nyquist_idx]
nyquist_part = new_nyquist_values.expand_dims(1) # 增加维度
right_part = out[:, nyquist_idx+1:]
# 重新拼接
out = ops.concat([left_part, nyquist_part, right_part], axis=1)
# 计算无小点数量
wuxiaodiansujl = int(np.floor(0.5 * (fs - B) * rnfft / fs))
wuxiaodiansujl = (wuxiaodiansujl >> 3) << 3 # 8字节对齐
wuxiaodiansujl = max(0, wuxiaodiansujl)
# 模块2: 方位向插值
pindian = (ops.arange(0, rnfft, dtype=ms.float32) - 0.5 * rnfft) * fs / rnfft
j_indices = ops.arange(0, fwn, dtype=ms.float32).reshape(-1, 1)
angle = -2 * PI * j_indices * pindian / f0 * moshuzhouqishu
complex_angle = angle.astype(ms.complex64)
j_times_angle = complex_angle * self.j_constant
phase_comp_all = self.exp(j_times_angle)
temp_complex_top = out * phase_comp_all
temp_complex = ops.Pad(((0, fwn_8 - fwn), (0, 0)))(temp_complex_top)
temp_complex_valid = temp_complex[:, wuxiaodiansujl:rnfft - wuxiaodiansujl]
xnew_valid = self.xnew[:, wuxiaodiansujl:rnfft - wuxiaodiansujl]
xold_valid = self.xold[:, wuxiaodiansujl:rnfft - wuxiaodiansujl]
tmp = self.stoltsun(temp_complex_valid, xnew_valid, xold_valid)
update_part = tmp
left_part = out[:fwn, :wuxiaodiansujl]
right_part = out[:fwn, rnfft - wuxiaodiansujl:]
# 在列方向拼接三部分
out = ops.concat([left_part, update_part, right_part], axis=1)
# 模块3: 距离向IFFT
out = self.cirshift_optimized(out)
out = self.IFFT4f(out, 0)
return out
```
#### 3.4 HRRP生成
```python
# 1. 方位向加窗(泰勒窗)
# 2. 方位向FFT(转到多普勒域)
# 3. 峰值提取(每个距离单元的幅度)
# 4. 返回HRRP幅度和复数值
def Gethrrp(self, input_data, hangmc, liejln):
afftn = Get2intm(hangmc)
# 只取有效数据部分进行加窗处理
windowed_data = input_data[:hangmc, :liejln] * self.window_2d
windowed_data = windowed_data.astype(ms.complex64)
windowed_data_t = ops.transpose(windowed_data, (1, 0))
padding = ((0, 0), (0, afftn - windowed_data_t.shape[1]))
fft_input = ops.Pad(paddings=padding)(windowed_data_t)
fft_ret = self.FFT4f(fft_input, 0)
# 计算幅度平方 - 向量化操作
# fd = self.abs(fft_ret)**2
abs_result = ops.cast(ops.abs(fft_ret), ms.float32)
fd = abs_result**2
# 找每列的峰值位置 - 向量化操作
maxp_indices = ops.argmax(fd, dim=1)
# 提取峰值幅度 - 向量化操作
row_indices = ops.arange(liejln, dtype=ms.int32)
hrrp = fd[row_indices, maxp_indices]
peak_values = fft_ret[row_indices, maxp_indices]
peak_values_2d = ops.expand_dims(peak_values, 0)
result = ops.cat([peak_values_2d, input_data[1:, :]], axis=0)
return hrrp, result
```
HRRP定义:
- 高分辨率距离像 = 目标在距离维的散射点分布
- 物理意义:目标的"电磁指纹",用于目标识别
#### 3.5 熵值评估
```python
def Gethrrp_v2shang(self, x):
sum_sq = self.reduce_sum(x**2) # 总能量
is_nan = sum_sq != sum_sq
# 检查无效情况
is_valid = ops.logical_and(sum_sq > 1e-10, ops.logical_not(is_nan))
# 归一化并计算熵
t = x / (sum_sq)
log_t = ops.log(t)
entropy = -self.reduce_sum(t * log_t) # 香农熵
entropy = entropy.reshape(1)
# 如果无效则返回0
result = ops.where(is_valid, entropy, ops.zeros(1, ms.float32))
result = result.reshape(1)
return result
```
熵的物理意义:
|HRRP特征 |熵值表现 |物理解释|
|----------|------------|--------|
|能量集中 |低熵值 |少数强散射点,成像清晰|
|能量分散 |高熵值 |多个弱散射点,成像模糊|
|噪声影响 |高熵值 |背景噪声使能量分散|
熵值选择准则:熵值越小 → 能量越集中 → 成像质量越好 → 模糊数越准确
#### 3.6 最优结果选择
```python
# 合并所有结果
results = ops.cat(results, axis=0) # [mohushu, hangmc, liejl]
shang = ops.cat(shang) # [mohushu]
# 找到最小熵对应的索引
xuhao = ops.argmin(shang) # 最优模糊数索引
# 选择最优结果
min_result = results[xuhao] # 最优HRRP结果
out = ops.cat([min_result[0:1, :], out[1:, :]], axis=0) # 更新最终结果
```
### 4 数据后处理
这里将结果导出成二进制数据的dat文件,用于和正确结果比较。确认结果无误后,可通过 ``mindspore.export`` 导出 MINDIR 模型,便于在 MindSpore Lite 端部署(板卡侧运行)。
## 板卡部署
模型部署建议使用 ``YHFT-IDE``,它集成了模型转换、模型可视化与 MindSpore Lite 端部署模板。具体使用方法可参考 {ref}`HelloDSP MindSpore Lite端 `。
以下是完整的MindSpore Signal+实现的代码:
```python
import mindspore as ms
import numpy as np
from mindspore import nn, ops
import math
import mindradar as mr
def read_hrrp_data(filename, maichongshu, jln, rfftn):
# 打开二进制文件并读取所有数据
with open(filename, 'rb') as fp:
# 计算预期数据量并读取
expected_size = maichongshu * jln * 2 # 每个复数点包含实部和虚部
data = np.fromfile(fp, dtype=np.float32)
# 验证数据完整性
if len(data) != expected_size:
raise ValueError(f"数据长度错误:期望{expected_size}个点,实际读取{len(data)}个点")
# 重塑数据为3D数组 [脉冲索引, 数据点, 实部/虚部]
data = data.reshape(maichongshu, jln, 2)
# 分离实部和虚部
datar = data[:, :, 0] # 实部 [maichongshu, jln]
datai = data[:, :, 1] # 虚部 [maichongshu, jln]
# 创建填充0的完整数组
datar_full = np.zeros((maichongshu, rfftn), dtype=np.float32)
datai_full = np.zeros((maichongshu, rfftn), dtype=np.float32)
# 复制有效数据部分
datar_full[:, :jln] = datar
datai_full[:, :jln] = datai
return datar_full, datai_full
def Get2intm(intnumb):
# 验证输入有效性
if not isinstance(intnumb, int) or intnumb <= 0:
raise ValueError("Input must be a positive integer")
# 特殊处理1的情况(log2(1)=0)
if intnumb == 1:
return 1
# 使用更精确的log2计算
log2_val = math.log2(intnumb)
# 向上取整获取指数
nn = math.ceil(log2_val)
# 计算2的nn次方
result = 2 ** nn
return int(result) # 转换为整数类型
def write_hrrp_binary_v2(filename, datar, datai, maichongshu, jln):
# 参数强验证
if datar.shape != (maichongshu, 512) or datai.shape != (maichongshu, 512):
raise ValueError(f"数组维度必须为 ({maichongshu}, 512)")
# 创建交错存储矩阵
interleaved = np.empty((maichongshu, jln*2), dtype=np.float32)
# 精确截取前2560列
interleaved[:, ::2] = datar[:, :jln] # 实部
interleaved[:, 1::2] = datai[:, :jln] # 虚部
# 原子写入操作
interleaved.tofile(filename)
class Hrrp(nn.Cell):
def __init__(self, hangmc, liejl, f0, fs, B, mohuhalfshu, PRF, vdengxiao, mubiaojuli):
super(Hrrp, self).__init__()
self.hangmc = hangmc
self.liejl = liejl
self.f0 = f0
self.fs = fs
self.B = B
self.mohuhalfshu = mohuhalfshu
self.PRF = PRF
self.vdengxiao = vdengxiao
self.mubiaojuli = mubiaojuli
self.fft = mr.FFT()
self.ifft = mr.IFFT()
self.fftshift = mr.FFTShift()
self.stoltsun = mr.Stoltsun(dim=0)
self.mul = ops.Mul()
self.exp = ops.Exp()
self.reduce_sum = ops.ReduceSum(keep_dims=True)
self.gather = ops.Gather()
wl = LC / self.f0
Ka = 2.0 * self.vdengxiao**2 / (wl * self.mubiaojuli)
mctime = (np.arange(self.hangmc) - self.hangmc * 0.5) / self.PRF
xiangwei = np.pi * Ka * mctime**2
self.phase_comp = np.exp(1j * xiangwei).astype(np.complex64)
self.phase_comp = self.phase_comp.reshape(self.hangmc, 1)
self.phase_comp = ms.Tensor(self.phase_comp)
fwn = self.hangmc
self.fwn = fwn
rnfft = Get2intm(self.liejl)
self.rnfft = rnfft
fwn_8 = (fwn + 7) // 8 * 8 # 8字节对齐
self.fwn_8 = fwn_8
self.xold = (np.arange(fwn_8, dtype=np.float32) - fwn / 2.0)[:, np.newaxis]
self.xold = np.broadcast_to(self.xold, (fwn_8, rnfft))
i_arr = np.arange(rnfft, dtype=np.float32)
sigma_arr = (f0 + (i_arr - rnfft * 0.5) * fs / rnfft) / f0
self.xnew = self.xold * sigma_arr
self.xold = ms.Tensor(self.xold)
self.xnew = ms.Tensor(self.xnew)
self.mchtr = self.TaiLeWindow(self.hangmc, 50)
window_2d = self.mchtr[:, np.newaxis] # 转换为列向量便于广播
self.window_2d = ms.Tensor(window_2d, dtype=ms.float32)
self.j_constant = ms.Parameter(ms.Tensor(1.0j, dtype=ms.complex64), name="j_constant")
self._precompute_cirshift_indices()
def _precompute_cirshift_indices(self):
rnfft = self.rnfft
shift = self.rnfft // 2
# 创建索引数组 [0, 1, 2, ..., rnfft-1]
indices = np.arange(rnfft, dtype=np.int32)
# 计算移位后的索引
# 对于左移shift:new_index = (old_index - shift) mod rnfft
indices_shifted = (indices - shift) % rnfft
# 转换为Tensor
self.cirshift_indices = ms.Tensor(indices_shifted, dtype=ms.int32)
def construct(self, input_data):
out = input_data * self.phase_comp
mohushu = 2 * self.mohuhalfshu + 1
results = []
shang = []
for mh in range(mohushu):
offset = mh - self.mohuhalfshu
tmp = self.Keystone_interp(out, self.f0, self.fs, self.B, offset)
hrrp, result = self.Gethrrp(tmp, self.hangmc, self.liejl)
shang_value = self.Gethrrp_v2shang(hrrp)
shang.append(shang_value)
result = ops.expand_dims(result, 0)
results.append(result)
results = ops.cat(results, axis=0)
shang = ops.cat(shang)
xuhao = ops.argmin(shang)
min_result = results[xuhao]
# 将min_result的第一行和out的第二行到最后一行连接起来
out = ops.cat([min_result[0:1, :], out[1:, :]], axis=0)
return out
def TaiLeWindow(self, N, param2):
"""
泰勒窗函数实现
参数:
N: 窗口长度
param2: 加权分贝值 (20~50)
返回:
归一化的窗函数数组
"""
# 参数处理
N = max(N, 1)
nLevel = 5
nsll = 45 if param2 < 13.0 else param2
# 参数转换
mB = math.pow(10.0, nsll / 20.0)
mA = math.log(mB + math.sqrt(mB * mB - 1.0)) / math.pi
msquaQ = (nLevel * nLevel) / (mA * mA + (nLevel - 0.5) * (nLevel - 0.5))
# 计算窗函数系数
m_dFm = [0.0] * nLevel
for m in range(nLevel - 1):
mtmpa = 1.0
mtmpb = 1.0
for i in range(nLevel - 1):
# 计算第一个乘积项
term = 1 - (m+1)**2 / (msquaQ * (mA**2 + (i+0.5)**2))
mtmpa *= term
# 跳过相同索引
if m == i:
continue
# 计算第二个乘积项
term2 = 1 - (m+1)**2 / ((i+1)**2)
mtmpb *= term2
# 计算系数值
m_dFm[m] = (pow(-1, m+2) * mtmpa) / (2.0 * mtmpb)
# 计算窗函数值
h = np.zeros(N)
for i in range(N):
dwin = 0.0
for m in range(nLevel - 1):
angle = 2 * math.pi * (m+1) * (i - N/2.0 + 0.5) / N
dwin += m_dFm[m] * math.cos(angle)
h[i] = 1.0 + 2.0 * dwin
# 归一化处理
mean_val = np.mean(h)
if abs(mean_val) > 1e-15:
h /= mean_val
return h
def Gethrrp_v2shang(self, x):
sum_sq = self.reduce_sum(x**2)
is_nan = sum_sq != sum_sq
# 检查无效情况
is_valid = ops.logical_and(sum_sq > 1e-10, ops.logical_not(is_nan))
# 归一化并计算熵
t = x / (sum_sq)
log_t = ops.log(t)
entropy = -self.reduce_sum(t * log_t)
entropy = entropy.reshape(1)
# 如果无效则返回0
result = ops.where(is_valid, entropy, ops.zeros(1, ms.float32))
result = result.reshape(1)
return result
def Gethrrp(self, input_data, hangmc, liejln):
afftn = Get2intm(hangmc)
# 只取有效数据部分进行加窗处理
windowed_data = input_data[:hangmc, :liejln] * self.window_2d
windowed_data = windowed_data.astype(ms.complex64)
windowed_data_t = ops.transpose(windowed_data, (1, 0))
padding = ((0, 0), (0, afftn - windowed_data_t.shape[1]))
fft_input = ops.Pad(paddings=padding)(windowed_data_t)
fft_ret = self.FFT4f(fft_input, 0)
# 计算幅度平方 - 向量化操作
abs_result = ops.cast(ops.abs(fft_ret), ms.float32)
fd = abs_result**2
# 找每列的峰值位置 - 向量化操作
maxp_indices = ops.argmax(fd, dim=1)
# 提取峰值幅度 - 向量化操作
row_indices = ops.arange(liejln, dtype=ms.int32)
hrrp = fd[row_indices, maxp_indices]
peak_values = fft_ret[row_indices, maxp_indices]
peak_values_2d = ops.expand_dims(peak_values, 0)
result = ops.cat([peak_values_2d, input_data[1:, :]], axis=0)
return hrrp, result
def Keystone_interp(self, input_data, f0, fs, B, moshuzhouqishu):
# 获取维度
fwn = self.fwn
fwn_8 = self.fwn_8
# 计算距离FFT长度
rnfft = self.rnfft
# 模块1: 距离向FFT
out = self.FFT4f(input_data, 1)
nyquist_idx = rnfft//2
new_nyquist_values = (out[:, nyquist_idx-1] + out[:, nyquist_idx+1]) / 2.0
# 使用concat操作替换特定列
left_part = out[:, :nyquist_idx]
nyquist_part = new_nyquist_values.expand_dims(1) # 增加维度
right_part = out[:, nyquist_idx+1:]
# 重新拼接
out = ops.concat([left_part, nyquist_part, right_part], axis=1)
# 计算无小点数量
wuxiaodiansujl = int(np.floor(0.5 * (fs - B) * rnfft / fs))
wuxiaodiansujl = (wuxiaodiansujl >> 3) << 3 # 8字节对齐
wuxiaodiansujl = max(0, wuxiaodiansujl)
# 模块2: 方位向插值
pindian = (ops.arange(0, rnfft, dtype=ms.float32) - 0.5 * rnfft) * fs / rnfft
j_indices = ops.arange(0, fwn, dtype=ms.float32).reshape(-1, 1)
angle = -2 * PI * j_indices * pindian / f0 * moshuzhouqishu
complex_angle = angle.astype(ms.complex64)
j_times_angle = complex_angle * self.j_constant
phase_comp_all = self.exp(j_times_angle)
temp_complex_top = out * phase_comp_all
temp_complex = ops.Pad(((0, fwn_8 - fwn), (0, 0)))(temp_complex_top)
temp_complex_valid = temp_complex[:, wuxiaodiansujl:rnfft - wuxiaodiansujl]
xnew_valid = self.xnew[:, wuxiaodiansujl:rnfft - wuxiaodiansujl]
xold_valid = self.xold[:, wuxiaodiansujl:rnfft - wuxiaodiansujl]
tmp = self.stoltsun(temp_complex_valid, xnew_valid, xold_valid)
update_part = tmp
left_part = out[:fwn, :wuxiaodiansujl]
right_part = out[:fwn, rnfft - wuxiaodiansujl:]
# 在列方向拼接三部分
out = ops.concat([left_part, update_part, right_part], axis=1)
# 模块3: 距离向IFFT
# out = self.cirshift(out, rnfft, rnfft // 2)
out = self.cirshift_optimized(out)
out = self.IFFT4f(out, 0)
return out
def cirshift_optimized(self, input_data):
"""优化后的循环移位"""
# 使用预计算的索引进行gather操作
return self.gather(input_data, self.cirshift_indices, 1)
def cirshift(self, input_data, n, shift):
shift = shift % n
if shift == 0:
return input_data
if input_data.ndim == 1:
# 一维情况
out = ops.concat([input_data[-shift:], input_data[:-shift]])
else:
# 二维情况,沿axis=1循环移位
out = ops.concat([input_data[:, -shift:], input_data[:, :-shift]], axis=1)
return out
def FFT4f(self, input_data, is_inverse):
fft_ret = self.fft(input_data)
if is_inverse:
fft_ret = self.fftshift(fft_ret)
end_time = time.time() * 1000
return fft_ret
def IFFT4f(self, input_data, is_inverse):
ifft_ret = self.ifft(input_data)
if is_inverse:
ifft_ret = self.fftshift(ifft_ret)
return ifft_ret
PI = 3.1415926535897
LC = 299.792458
maichongshu = 400
jln = 512
f0 = 9500
fs = 3600.0
B = 3000
PRF = 2000
vdengxiao = 0
mubiaojuli = 700e3
mohuhalfshu = 1
mohuhalfshu = (int)((15 * 2 / (LC / f0) / PRF) * 1.5) + 1
mohuhalfshu = 5
rfftn = Get2intm(jln)
afftn = Get2intm(maichongshu)
datar, datai = read_hrrp_data("hrrp_400_512.dat", maichongshu, jln, rfftn)
input_data = np.zeros(datar.shape, dtype=np.complex64)
input_data = datar + 1j * datai
input_tensor = ms.Tensor(input_data)
print(input_data.shape)
print(input_tensor)
model = Hrrp(maichongshu, jln, f0, fs, B, mohuhalfshu, PRF, vdengxiao, mubiaojuli)
out = model(input_tensor)
out = out.asnumpy()
dr_out = out.real
di_out = out.imag
write_hrrp_binary_v2("hrrp_out_400_512_py.dat", dr_out, di_out, maichongshu, jln)
print(out.shape)
print(out)
ms.export(model, input_tensor, file_name="hrrp_400_512", file_format='MINDIR')
```